17.7 其他

本节介绍垃圾回收过程中使用的几种辅助结构。

并行任务框架

parfor关注的是任务分配和调度,其自身不具备执行能力。它将多个任务分组交给多个执行线程,然后在执行过程中重新平衡线程的任务分配,确保整个任务在最短时间内完成。

设置函数parforsetup用相关参数初始化desc状态,并完成任务分组。

parfor.go

func parforsetup(desc*parfor,nthr,n uint32,wait bool,body func(*parfor,uint32)) { desc.body=body // 任务函数 desc.nthr=nthr // 任务线程数量 desc.cnt=n // 任务数量 …

// 任务分组 for i:=range desc.thr{ begin:=uint32(uint64(n) *uint64(i) /uint64(nthr)) end:=uint32(uint64(n) *uint64(i+1) /uint64(nthr)) desc.thr[i].pos=uint64(begin) |uint64(end)<<32 } }

最后的循环语句将n个任务编号平分成nthr份,并将开始和结束位置保存到pos的高低位。以10任务5线程为例,thr[0]分到的任务单元就是[0,2)。

线程须主动调用parfordo来获取任务组,执行body任务函数。

parfor.go

func parfordo(desc*parfor) { // 为每个线程分配一个唯一序号 tid:=xadd(&desc.thrseq,1) -1

// 任务函数 body:=desc.body

// 如果只有单个线程,直接按顺序执行 if desc.nthr==1{ for i:=uint32(0);i<desc.cnt;i++ { body(desc,i) } return }

// 用线程序号提取任务组 me:= &desc.thr[tid] mypos:= &me.pos for{ // 先完成自身任务 for{ // 任务进度:直接累加pos低位的起始位置 pos:=xadd64(mypos,1)

     // 未超出任务组边界,执行 
    begin:=uint32(pos) -1
    end:=uint32(pos>>32) 
    if begin<end{ 
        body(desc,begin) 
        continue
     } 
    break
 } 

 // 提前完成工作,尝试从其他线程偷取任务 
idle:=false
for try:=uint32(0); ;try++ { 
     // 如多次行窃未果,那么准备打卡下班 
    if try>desc.nthr*4&& !idle{ 
        idle=true
        xadd(&desc.done,1) 
    } 

     // 如果其他线程都已完成工作,结束 
    extra:=uint32(0) 
    if!idle{ 
        extra=1
     } 
    if desc.done+extra==desc.nthr{ 
        if!idle{ 
            xadd(&desc.done,1) 
         } 
        goto exit
     } 

     // 随机挑选一个线程 
    var begin,end uint32
    victim:=fastrand1() % (desc.nthr-1) 
    if victim>=tid{ 
        victim++ 
     } 
    victimpos:= &desc.thr[victim].pos
    for{ 
         // 检查目标线程的当前任务进度 
        pos:=atomicload64(victimpos) 
        begin=uint32(pos) 
        end=uint32(pos>>32) 
        if begin+1>=end{ 
            end=0
            begin=end
            break
         } 

         // 有任务可偷,要忙起来了 
        if idle{ 
            xadd(&desc.done, -1) 
            idle=false
         } 

         // 将剩余任务偷一半(后半截) 
        begin2:=begin+ (end-begin)/2

         // 记得修改原主的任务结束值 
        newpos:=uint64(begin) |uint64(begin2)<<32
        if cas64(victimpos,pos,newpos) { 
            begin=begin2
            break
         } 
     } 

     // 将偷来的任务编号保存到自己pos里 
    if begin<end{ 
        atomicstore64(mypos,uint64(begin)|uint64(end)<<32) 
        me.nsteal++ 
        me.nstealcnt+=uint64(end) -uint64(begin) 

         // 跳出偷窃循环,进入外层循环重新执行自己的任务(尽管是偷来的) 
        break
     } 

     // 没偷到任务,就暂停或退出 ... 
 } 

} exit: … }

缓存队列

gcWork被设计来保存灰色对象,必须在保证并发安全的前提下,拥有足够高的性能。

mgcwork.go

type gcWork struct{ wbuf wbufptr }

该结构的真正核心是workbuf,gcWork不过是外层包装。workbuf作为无锁栈节点,其自身就是一个缓存容器(数组成员)。

mgcwork.go

type workbufhdr struct{ node lfnode nobj int }

type workbuf struct{ workbufhdr obj[(_WorkbufSize-unsafe.Sizeof(workbufhdr{})) /ptrSize]uintptr }

透过gcWork相关方法,我们可以观察workbuf是如何工作的。

mgc.go

var work struct{ full uint64 //lock-free list of full blocks workbuf empty uint64 //lock-free list of empty blocks workbuf partial uint64 //lock-free list of partially filled blocks workbuf }

mgcwork.go

func(ww*gcWork)put(obj uintptr) { w:= (*gcWork)(noescape(unsafe.Pointer(ww)))

// 从work.empty获取一个workbuf复用 wbuf:=w.wbuf.ptr() if wbuf==nil{ wbuf=getpartialorempty(42) w.wbuf=wbufptrOf(wbuf) }

// 直接将obj保存在workbuf.obj数组 wbuf.obj[wbuf.nobj] =obj wbuf.nobj++

// 如果数组填满,则将该数组移交给work.full // 本地obj=nil,下次put时获取一个复用对象填充 if wbuf.nobj==len(wbuf.obj) { putfull(wbuf,50) w.wbuf=0 } }

func putfull(b*workbuf,entry int) { lfstackpush(&work.full, &b.node) }

这种做法有点像内存分配器的cache,优先操作本地缓存,直到满足某个阈值再与全局交换。这么做,可以保证性能,避免直接操作全局队列;另一方面,从全局获取任务时,总是能一次性拿到一组。

就算是无锁数据结构,使用原子操作也会有性能损耗,尤其是在多核环境下。

这段代码,包括work全局变量,有很多C的影子,看上去有些别扭,完全不是Go的风格。如果是自动代码转换,那么下个版本是不是要对runtime里面很多违和的地方清理一下。

消费完毕的workbuf对象会被放回work.empty,以供复用。

mgcwork.go

func(ww*gcWork)get()uintptr{ w:= (*gcWork)(noescape(unsafe.Pointer(ww)))

// 从work.full获取一个workbuf对象 wbuf:=w.wbuf.ptr() if wbuf==nil{ wbuf=getfull(103) if wbuf==nil{ return 0 } w.wbuf=wbufptrOf(wbuf) }

// 直接从本地workbuf提取 wbuf.nobj— obj:=wbuf.obj[wbuf.nobj]

// 本地workbuf已空,将其放回work.empty供复用 if wbuf.nobj==0{ putempty(wbuf,115) w.wbuf=0 }

return obj }

func putempty(b*workbuf,entry int) { lfstackpush(&work.empty, &b.node) }

至于Free-Lock Stack的实现,也很简单利用CAS(Compare&Swap)指令来实现原子替换操作。这里用Node Pointer+Node.PushCount实现了Double-CAS。

lfstack.go

func lfstackpush(headuint64,nodelfnode) { // 累加计数器 node.pushcnt++

// 利用pointer+pushcnt获得唯一流水号 new:=lfstackPack(node,node.pushcnt)

// 逆向展开流水号,进行错误检查 if node1, _ :=lfstackUnpack(new);node1!=node{ throw(“lfstackpush”) }

// 类似自旋,重试直到成功 for{ // 原子读取原head node流水号(多核) old:=atomicload64(head)

 // 将当前node作为head
 // 未成功前,这个操作并不影响原stack
node.next=old

 // 利用CAS指令替换原head
 // 如替换失败,则循环重试 
if cas64(head,old,new) { 
    break
 } 

} }

func lfstackpop(head*uint64)unsafe.Pointer{ for{ // 原子读取stack head old:=atomicload64(head) if old==0{ return nil }

 // 展开流水号,获取pointer
node, _ :=lfstackUnpack(old) 

 // 利用CAS指令修改stack head
next:=atomicload64(&node.next) 
if cas64(head,old,next) { 
    return unsafe.Pointer(node) 
 } 

} }

如果CAS指令判断的仅是old指针地址,而该地址又被意外重用,那就会造成错误结果,这就是所谓的ABA问题。利用“指针地址+计数器”生成唯一流水号,实现Double-CAS,就能避开。

lfstack_amd64.go

func lfstackPack(node*lfnode,cnt uintptr)uint64{ return uint64(uintptr(unsafe.Pointer(node)))<<16|uint64(cnt&(1<<19-1)) }

内存状态统计

除了用GODEBUG=“gctrace=1”输出垃圾回收状态信息外,某些时候我们还需要自行获取内存相关统计数据。

与之相关的数据结构,分别是运行时内部使用的mstats和面向用户的MemStats。两者大部分结构相同,只是在输出结果上有细微调整。

mstats.go

type mstats struct{ alloc uint64 // 当前分配的object内存(含未回收的白色对象) total_alloc uint64 // 历史累计分配内存(当前正在使用和历次回收释放) sys uint64 // 当前从操作系统获取的内存(所有分配总和,不包括已释放) nmalloc uint64 // 分配次数累计 nfree uint64 // 释放次数累计

heap_alloc uint64 // 同alloc heap_sys uint64 // 从操作系统获取的内存(不包括已释放) heap_idle uint64 // 闲置span内存 heap_inuse uint64 // 正在使用span内存(从heap提取,包括stack) heap_released uint64 // 当前已归还操作系统的内存 heap_objects uint64 // 正在使用object数量(不含闲置链表)

stacks_inuse uint64 // 正在使用stack内存(含stackpool) mspan_inuse uint64 // 正在使用mspan内存 mcache_inuse uint64 // 正在使用mcache内存

next_gc uint64 // 下次垃圾回收阈值 last_gc uint64 // 上次垃圾回收结束时间(UnixNano,不包括并发清理) pause_total_ns uint64// 累计STW暂停时间 pause_ns [256]uint64 // 最近垃圾回收周期里STW暂停时间(循环缓冲区) pause_end [256]uint64// 最近垃圾回收周期里STW暂停结束时间(UnixNano) numgc uint32 // 垃圾回收次数 gc_cpu_fraction float64 //GC所耗CPU时间比例(f*100%)

heap_live uint64 // 自上次回收后堆使用内存(黑色+新分配,不包括白色对象) }

object特指cache分配的小块内存,以及large object,而非实际用户对象。

用户通过runtime.ReadMemStats函数来获取统计数据。

mstats.go

func ReadMemStats(m*MemStats) { stopTheWorld(“read mem stats”)

systemstack(func() { readmemstats_m(m) })

startTheWorld() }

func readmemstats_m(stats*MemStats) { updatememstats(nil)

// 前面部分数据结构相同,直接拷贝 memmove(unsafe.Pointer(stats),unsafe.Pointer(&memstats),sizeof_C_MStats)

// 将栈内存从统计数据剔除,仅显示用户逻辑消耗 stats.StackSys+=stats.StackInuse stats.HeapInuse-=stats.StackInuse stats.HeapSys-=stats.StackInuse }

注意:ReadMemStats会进行STW操作,应控制调用时间和次数。

监控输出示例。

                heap_idle         heap_released
                 |                   | 

scvg0:inuse:3,idle:1,sys:5,released:0,consumed:5(MB) | | | heap_inuse heap_sys heap_sys-heap_released